Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: normalize all send request resource release into sr.done #1121

Merged
merged 13 commits into from
Oct 25, 2023

Conversation

tisonkun
Copy link
Member

@tisonkun tisonkun commented Oct 24, 2023

This refers to #1071. This follows up to #1120.

This patch contains two fixes with one refactor unsplittably.

  • Refactor - collect sendrequest cleanup logic to sr.done and replace it everywhere release sendrequests.
  • Fix 1 - only call txn.endSendOrAckOp at the last chunk finished/failed, this fixes [Bug][Producer]Inaccurate transaction endSendOrAckOp for chunked message #1060.
  • Fix 2 - blocking reserving resources in front at internalSendAck. Although it breaks chunked messages can pass the permits limitation one by one under DisableBlockIfQueueFull=false, but the Golang client logic is different from Java one, where Golang's internalSendAsync and internalSend do not run in the same thread. I'd prefer to regard the original manner a possible improvement but blocking require full permits should work well for most scenarios.

Together, this patch fixes #1043.

Does this pull request potentially affect one of the following parts:

If yes was chosen, please highlight the changes

  • Dependencies (does it add or upgrade a dependency): (yes / no)
  • The public API: (yes / no)
  • The schema: (yes / no / don't know)
  • The default values of configurations: (yes / no)
  • The wire protocol: (yes / no)

Documentation

  • Does this pull request introduce a new feature? (yes / no)
  • If yes, how is the feature documented? (not applicable / docs / GoDocs / not documented)
  • If a feature is not applicable for documentation, explain why?
  • If a feature is not documented yet in this PR, please create a followup issue for adding the documentation

@tisonkun
Copy link
Member Author

Here is the final failure test -

2023-10-24T21:48:31.4618725Z time="2023-10-24T21:48:23Z" level=info msg="Created consumer" consumerID=1 name=uyats subscription=my-sub topic="persistent://public/default/my-topic-735233632"
2023-10-24T21:48:31.4619008Z     transaction_test.go:493: 
2023-10-24T21:48:31.4619674Z         	Error Trace:	/pulsar/pulsar-client-go/pulsar/transaction_test.go:493
2023-10-24T21:48:31.4620886Z         	Error:      	Expected nil, but got: &pulsar.Error{msg:"There are some operations that are not completed after the timeout.: TimeoutError", result:3}
2023-10-24T21:48:31.4621142Z         	Test:       	TestAckChunkMessage
2023-10-24T21:48:31.4622274Z time="2023-10-24T21:48:29Z" level=info msg="Closing consumer=1" consumerID=1 name=uyats subscription=my-sub topic="persistent://public/default/my-topic-735233632"
2023-10-24T21:48:31.4623394Z time="2023-10-24T21:48:29Z" level=info msg="Closed consumer" consumerID=1 name=uyats subscription=my-sub topic="persistent://public/default/my-topic-735233632"
2023-10-24T21:48:31.4624662Z time="2023-10-24T21:48:29Z" level=info msg="close consumer, exit reconnect" consumerID=1 name=uyats subscription=my-sub topic="persistent://public/default/my-topic-735233632"
2023-10-24T21:48:31.4625711Z time="2023-10-24T21:48:29Z" level=info msg="Closing producer" producerID=1 producer_name=test topic="persistent://public/default/my-topic-735233632"
2023-10-24T21:48:31.4626797Z time="2023-10-24T21:48:29Z" level=info msg="Closed producer" producerID=1 producer_name=test topic="persistent://public/default/my-topic-735233632"
2023-10-24T21:48:31.4627034Z --- FAIL: TestAckChunkMessage (5.83s)

@@ -458,7 +458,6 @@ func TestAckChunkMessage(t *testing.T) {
// Create transaction and register the send operation.
txn, err := client.NewTransaction(time.Hour)
require.Nil(t, err)
txn.(*transaction).registerSendOrAckOp()
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We do register in internalSendAsync. This call seems like a workaround we ever added as a HACK 🤣

@tisonkun
Copy link
Member Author

Should merge #1120 first

@tisonkun
Copy link
Member Author

tisonkun commented Oct 25, 2023

@gunli supposed we can still acquire permits with totalChunk size in DisableBlockIfQueueFull=true for fail fast. IMO it can be reasonable and align with Java client's manner. But the current impl is correct also. So I tend to factor out this attempt to a follow-up PR for investigating.

BlockingIfQueueFull is convenient for users, recover the manner in this patch.

@tisonkun tisonkun marked this pull request as ready for review October 25, 2023 07:49
p.metrics.BytesPublished.Add(payloadSize)
p.metrics.BytesPending.Sub(payloadSize)

if sr.callback != nil || len(p.options.Interceptors) > 0 {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

indent - sr.callback != nil is always true.

// Large messages will be split into 11 chunks, exceeding the length of pending queue
ID, err := producer.Send(context.Background(), &ProducerMessage{
_, err = producer.Send(ctx, &ProducerMessage{
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is for Fix 2.

@gunli
Copy link
Contributor

gunli commented Oct 25, 2023

I think it is OK, well done.

Copy link
Member

@nodece nodece left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@tisonkun
Copy link
Member Author

Merging...

Thanks for your review!

Further improvement can be made as described above.

@tisonkun tisonkun merged commit 7c14fb7 into apache:master Oct 25, 2023
6 checks passed
@tisonkun tisonkun deleted the resourcefix branch October 25, 2023 11:44
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
4 participants